BigQueryのSparkストアドプロシージャでGCSのファイルを操作する(プレビュー)
データアナリティクス事業本部、池田です。
少し前に、BigQuery上で Apache Spark
でストアドプロシージャを構築できる機能が プレビュー
となりました。
【 BigQuery で無制限のワークロードを構築: SQL 以外の言語を使用した新機能 】
Spark in BigQueryが発表?BigQueryの画面でSparkジョブを書いて、BigQueryのストアドプロシージャとしてSparkを実行。料金はBigQueryのスロットに統合。Pythonで書きたい処理をストアド化したり、非構造化データやクレンジングもBigQueryで出来る。https://t.co/yos3CZ42ca#GoogleCloudNext pic.twitter.com/XX9qXVVoJz
— Yuta.H (@yutah_3) October 11, 2022
今回は、このSparkのストアドプロシージャでCloud Storage(以下GCS)に配置されたファイルを操作してみます。
この機能は、執筆時点(2022/11/07)では申請が必要なプレビューとなります。 フォームからの申請が必要でしたが、 私の場合は2022/10/14に簡単な入力で申請をして、 2022/10/19には承認メールが届き利用できました。
実装については、以下の公式のガイドを参考に進めました。
【 Work with stored procedures for Apache Spark 】
メタストアやSpark 履歴サーバーといった機能もあるようですが、 今回は触りません… シンプル?な「GCS上のCSVをJSONにして書き出すパターン」と 「GCS上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加するパターン」 をまずは実装してみました。
※当ブログの以下の操作は、プロジェクトのオーナー権限を持つアカウントで、 コマンドは Cloud Shell 、 SQLはBigQueryのコンソールで実行しています。
接続の作成
BigQuery Connection API
を有効化します。
bqコマンドでUSマルチリージョンにSparkの接続を作成します。
bq mk --connection --connection_type='SPARK' \ --location=US \ spark-conn
接続に紐づくサービスアカウントが必要になるので、bqコマンドで取得します。
bq show --connection us.spark-conn
この結果が↓
Connection {プロジェクトID}.us.spark-conn name friendlyName description Last modified type hasCredential properties ---------------------------------- -------------- ------------- ----------------- ------- --------------- ---------------------------------------------------------------------------------------------- {プロジェクト番号}.us.spark-conn 07 Nov 01:30:17 SPARK False {"serviceAccountId": "{プロジェクト番号を含む文字列}@gcp-sa-bigquery-consp.iam.gserviceaccount.com"}
↑一番右にサービスアカウントがありました。
↓接続作成後、コンソール上だとこんな感じ。
こちらでもサービスアカウントは確認できそうです。
この接続を使って、2パターンのストアドプロシージャを作成します。
ストアドプロシージャ1:GCS上のCSVをJSONにして書き出す
なんとなくのイメージでGCS上のファイルを扱うことが多そうな気がしたので、
GCSを絡めたインプットとアウトプットを試してみます。
また、呼び出し時のパラメータの渡し方も確認してみます。
※ただGCSと読み書きするだけであれば、 LOAD DATA
や EXPORT DATA
の
構文などで十分
です。実際の利用時にはSparkでやりたい処理が間に入るイメージかなと思います。
実装
まず、ストアドプロシージャはデータセット配下に作成するので、 データセットを作成します。
CREATE SCHEMA my_spark OPTIONS ( location = 'US' );
あとはストアドプロシージャを作成します。
GCS上に.pyファイルを置く方法もあるようですが、
今回はインライン コードで記述しています。
内容は、引数で指定されたGCSバケットのCSVファイルをロードし、別パスにJSONとして書き出すだけです。
CREATE PROCEDURE my_spark.my_spark_gcs_sample(sample STRING, bucket STRING) WITH CONNECTION `us.spark-conn` OPTIONS ( engine="SPARK" ) LANGUAGE PYTHON AS R""" import os from pyspark.sql import SparkSession # 引数出力 print("PROCEDURE START. param: {}".format(os.environ["BIGQUERY_PROC_PARAM.sample"].strip("'\""))) bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"") # セッションを作成 spark = SparkSession.builder.appName("my_spark_sample").getOrCreate() # GCSを参照 df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/") # データ表示 df.show() # GCS出力 df.write.json(f"gs://{bucket_name}/outputs/") """ ;
呼び出し時の引数は、
os.environ["BIGQUERY_PROC_PARAM.{引数名}"]
のように参照できるそうですが、
私の場合文字列が " "
で括られてしまっていたので、
strip()
を噛ませています。
readのcsv() でファイルをロードし、 writeのjson() でそのまま書き出しています。
↓コンソール上はこんな感じ。
このパターンの場合、接続のサービスアカウントにGCSバケットを扱う権限が必要になるので、
前述の公式ガイドに倣って、 roles/storage.objectAdmin
を付与します。
gsutil iam ch serviceAccount:{サービスアカウント}:objectAdmin \ gs://{バケット名}
この権限が無いと呼び出し時に Permission 'storage.objects.get' denied
みたいなエラーになりました。
ロールの扱いだけ上手くできれば、割とさくっと実装できました。
実行
インプットのファイルは3行ほどのヘッダー有りのCSVです。
col1,col2,col3 aaa,123,true bbb,456,false
CALL
で引数を渡しつつ呼び出します。
CALL my_spark.my_spark_gcs_sample('Sparkのストアドプロシージャを使ってみる', '{バケット名}');
↑ print()
や df.show()
の結果はBigQueryのコンソールからも確認できるようです。便利!
↓完了すると、成功を示すと思われる _SUCCESS
という空ファイルと、
データのJSONファイルが出力されていました。
{"col1":"aaa","col2":"123","col3":"true"} {"col1":"bbb","col2":"456","col3":"false"}
私の場合はコンソール上での実行から100秒くらいで完了になりました。 ※あくまでプレビュー段階です。
ストアドプロシージャ2:GSC上のCSVをBigQueryに取り込む時にファイルパスをカラムとして追加する
もう一つのパターンは、 BigQueryのテーブルにロードするのですが、 どのファイルからロードされたのか分かるように、 元のファイルのパスをカラムとして追加してみます。
実装
内容は、引数で指定されたGCSバケットのCSVファイル群を、 ファイルパスをデータセットにカラムとして追加してから、 別の引数として指定されたBigQueryテーブルに出力します。
CREATE PROCEDURE my_spark.my_spark_import_sample(bucket STRING, table STRING) WITH CONNECTION `us.spark-conn` OPTIONS ( engine="SPARK" ) LANGUAGE PYTHON AS R""" import os from pyspark.sql import SparkSession from pyspark.sql.functions import input_file_name # 引数 bucket_name = os.environ["BIGQUERY_PROC_PARAM.bucket"].strip("'\"") table_name = os.environ["BIGQUERY_PROC_PARAM.table"].strip("'\"") # セッションを作成 spark = SparkSession.builder.appName("my_spark_import_sample").getOrCreate() # GCSを参照 df = spark.read.option("header",True).csv(f"gs://{bucket_name}/inputs/") # パスを設定 df = df.withColumn("uri", input_file_name()) # BQへ格納 df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save(table_name) """ ;
input_file_name()
でさくっと取れてしまうんですね…知りませんでした…
このパターンの場合、接続のサービスアカウントに前章のGCSの権限に加えて、
BigQueryを扱う権限が必要になるので、
前述の公式ガイドに倣って、取込み先のデータセットで roles/bigquery.admin
を付与します。
GRANT `roles/bigquery.admin` ON SCHEMA my_spark TO 'serviceAccount:{サービスアカウント}';
これが無いと、実行時に Access Denied
的なことを言われます。
実行
インプットのファイルは、前章の3行のCSVを複製して、 2ファイルをロードしてみます。
実行してみます。(事前にテーブルは作成していません。)
CALL my_spark.my_spark_import_sample('{バケット名}', 'my_spark.imported');
↑ファイルのパスがカラムとして追加されて、ロードできています!
私の場合はコンソール上での実行から2分弱くらいで完了になりました。 ※あくまでプレビュー段階です。
おわりに
いろいろ使えそうな機能ですね。
今回使わなかった機能や、 データ量が多い時の所要時間なども気になりますね。 気が向いたらブログにします。